Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Managed Transforms API #31495

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

ahmedabu98
Copy link
Contributor

@ahmedabu98 ahmedabu98 commented Jun 4, 2024

Adding Python API for Managed transforms, similar to the Java API.

@ahmedabu98 ahmedabu98 marked this pull request as draft June 4, 2024 15:17
Copy link

codecov bot commented Jun 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 71.14%. Comparing base (bb4ad99) to head (a3067dc).
Report is 100 commits behind head on master.

Current head a3067dc differs from pull request most recent head 0296f60

Please upload reports for the commit 0296f60 to get more accurate results.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #31495   +/-   ##
=========================================
  Coverage     71.14%   71.14%           
  Complexity     3008     3008           
=========================================
  Files          1055     1055           
  Lines        133439   133439           
  Branches       3248     3248           
=========================================
  Hits          94929    94929           
  Misses        35382    35382           
  Partials       3128     3128           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ahmedabu98 ahmedabu98 marked this pull request as ready for review June 18, 2024 18:34
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Jul 15, 2024

Successful Dataflow run (2024-07-15_13_54_11-2004515317250011573) with the following code:

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)

options = PipelineOptions([
    "--runner=DataflowRunner",
    "--job_name=managed-iceberg-read-demo",
    "--project=apache-beam-testing",
    "--temp_location=gs://apache-beam-testing-ahmedabualsaud/tmp",
    "--region=us-central1",
    "--sdk_location=sdks/python/dist/apache_beam-2.59.0.dev0.tar.gz",
    "--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
])

with beam.Pipeline(options=options) as p:
    p | beam.managed.Read(
        "iceberg",
        config={
            "table": "my_database.my_table",
            "catalog_name": "ahmed_catalog",
            "catalog_properties": {
                "catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog",
                "warehouse": "gs://ahmedabualsaud-apache-beam-testing"
            }}) | "Log rows" >> beam.Map(_LOGGER.info)

Note: I had to include the following dependencies (needed by Iceberg) in the expansion service jar:

runtimeOnly library.java.hadoop_client
runtimeOnly library.java.bigdataoss_gcsio
runtimeOnly library.java.bigdataoss_gcs_connector
runtimeOnly library.java.bigdataoss_util_hadoop

Copy link
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments but Cham would have more context.

sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
inline :class:`dict` like so::

results = p | beam.managed.Read(
beam.managed.ICEBERG,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about the boundaries for the 'managed' scope.

  1. Is it always a source/sink, or potentially any transform?
  2. is the transform being managed is named 'Read'? if so, why I wonder if beam.managed.ICEBERG should be part of a config instead of a separate argument here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) It can be any transform.

(2) API wise, I think it's cleaner to separate these. Basically what the user is saying is, "I want to use the Managed Iceberg transform and use this config to configure it". The config can be available locally or remotely. Also, this make it easy to identify the primary function of a transform looking at a pipeline definition.

sdks/python/apache_beam/transforms/managed.py Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
Copy link
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments but Cham would be have more context.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

inline :class:`dict` like so::

results = p | beam.managed.Read(
beam.managed.ICEBERG,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) It can be any transform.

(2) API wise, I think it's cleaner to separate these. Basically what the user is saying is, "I want to use the Managed Iceberg transform and use this config to configure it". The config can be available locally or remotely. Also, this make it easy to identify the primary function of a transform looking at a pipeline definition.

sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Show resolved Hide resolved
sdks/python/apache_beam/transforms/managed.py Show resolved Hide resolved
self._yaml_config = yaml.dump(config)
self._config_url = config_url

def expand(self, input):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some unit-test coverage using simple pipelines ?

@chamikaramj
Copy link
Contributor

LGTM. We can merge after existing comments are addressed.

@liferoad liferoad added this to the 2.60.0 Release milestone Aug 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants